package com.peng.storm;


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

/**
 * storm demo
 *
 * @author wangpeng
 * @create 2018-04-11 15:36
 **/
public class HelloWorld {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        //Spout 消息源
        builder.setSpout("word", new TestWordSpout(), 1);
        //消费者  在Bolt任务中定义的Stream进行区分

        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");
        builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            //本地任务提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test3", conf, builder.createTopology());
            Utils.sleep(20000);
            //shutdown
            cluster.killTopology("test3");
            cluster.shutdown();
        }
    }
}
